package com.atguigu.realtime.apps

import java.time.LocalDate
import java.util
import java.util.Properties

import com.alibaba.fastjson.JSON
import com.atguigu.common.constants.{PrefixConstant, TopicConstant}
import com.atguigu.common.utils.{JedisUtil, PropertiesUtil}
import com.atguigu.realtime.beans.{ActionLog, CouponAlertInfo, OrderDetail, OrderInfo, ProvinceInfo, SaleDetail, UserInfo}
import com.atguigu.realtime.utils.{DStreamUtil, DateHandleUtil}
import com.google.gson.Gson
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks

/**
 * Created by Smexy on 2022/6/29
 *
 *    在SparkStreaming中如果两个流要进行Join有以下前提:
 *        ①只有DS[K,V] 才能Join。把要on后面的字段，作为K
 *              类似：   a join b on a.id = b.id
 *
 *              DS[K,V1] JOIN  DS[K,V2]  , JOIN后  DS[K,(V1,V2)]
 *
 *        ②只有从同一个StreamingContext获取的流，才能Join!
 *
 *    -----------------
 *      at least once + 幂等输出

    ---------------------
        双流Join相关：
              数据无法Join的原因：  要join的数据处于不同的批次被消费到，无法Join！

            解决的核心：   无法Join的数据，如果早到，就写入缓存，如果晚到就到缓存区匹配。

          细节：
                orderInfo 和 orderDetail 是1：N的关系。

              orderInfo：
                          ①和当前批次达到的orderDetail关联
                          ②不能确定后续是否有晚到的orderDetail，需要把自己无条件写入缓存
                          ③不能确定之前批次有没有早到的OrderDetail，需要去缓存查询orderDetail，找到就配对

              orderDetail：
                          ④和当前批次达到的orderInfo关联(只需要做一次)
                          ⑤如果 ①配对不成功，说明当前的orderDetail早到或晚到
                                    去缓存查询是否有早到的orderInfo，如果找到，说明当前的orderDetail晚到了，就配对。
                          ⑥如果找不到，说明orderDetail早到了，就自己写入缓存。等待后续批次的orderInfo匹配。

    --------------------------
            设计在redis中 order_info和order_detail的存储格式:
                  order_info:
                                  使用场景： 根据order_id 找 order_info
                                  选择：
                                  K： 体现order_id
                                  v: string

                  order_detail: 使用场景： 根据order_id 找 多笔已经在缓存存在的order_detail
                                   选择：
                                  K： 体现order_id
                                  v:  set

  ---------------------------
 `order_detail`  LEFT JOIN  order_info : 取到每笔详情的 user_id和province_id，
再去关联，获取user和province的信息


order_detail`  LEFT JOIN  order_info： 是事实表,随时产生事实。流式处理。

user_info:  维度表。 既有 insert 又有 update。实时随时产生数据，流式处理。


province_info: 维度表。 几乎不变。将数据从数据库中查询出来，使用静态集合(List,Map)进行存储。
					全局只需要查一次即可。
 *
 */
object SaleDetailApp extends BaseApp {
  override var appName: String = "SaleDetailApp"
  override var groupId: String = "realtime220212"
  //主要的topic
  override var topic: String = TopicConstant.ORDER_DETAIL
  var topicOrderInfo: String = TopicConstant.ORDER_INFO
  override var batchDuration: Int = 10


  def main(args: Array[String]): Unit = {

    //写入ES，需要在Configuration中添加ES集群的一些参数
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(appName)

    sparkConf.set("es.nodes",PropertiesUtil.getProperty("es.nodes"))
    sparkConf.set("es.port",PropertiesUtil.getProperty("es.port"))
    sparkConf.set("es.index.auto.create", "true")
    //允许使用主机名(域名)访问es节点
    sparkConf.set("es.nodes.wan.only", "true")

    context = new StreamingContext(sparkConf,Seconds(batchDuration))

    //调用一次查询省份   在Driver
    /*
        第一种： driver端变量作为闭包变量，复制给每一个Task
        第二种： 变量只读，可以使用广播变量，复制给每一个Executor

            一个SparkStreaming APP在运行时， 只申请 2个Executor,并行度达到300
     */
    val provinceMap: Map[String, ProvinceInfo] = queryProvince(sparkConf)

    //广播   广播后，在算子中，只能用provinceBC，不能用provinceMap
    val provinceBC: Broadcast[Map[String, ProvinceInfo]] = context.sparkContext.broadcast(provinceMap)

    runApp{

      val orderDS: InputDStream[ConsumerRecord[String, String]] = DStreamUtil.createDS(context, groupId, topicOrderInfo)
      val orderDetailDS: InputDStream[ConsumerRecord[String, String]] = DStreamUtil.createDS(context, groupId, topic)

      var orderDSOffsets: Array[OffsetRange] = null
      var orderDetailDSOffsets: Array[OffsetRange] = null
      //转换，将 DS[T] 转换为 DS[K,V]
      val ds1: DStream[(String, OrderInfo)] = orderDS.transform(rdd => {

        orderDSOffsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd.map(record => {
          val orderInfo: OrderInfo = JSON.parseObject(record.value(), classOf[OrderInfo])

          //封装额外的时间字段
          orderInfo.create_date = DateHandleUtil.parseDateTimeToDate(orderInfo.create_time)
          orderInfo.create_hour = DateHandleUtil.parseDateTimeToHour(orderInfo.create_time)
          (orderInfo.id, orderInfo)

        })


      })

      val ds2: DStream[(String, OrderDetail)] = orderDetailDS.transform(rdd => {

        orderDetailDSOffsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd.map(record => {
          val orderDetail: OrderDetail = JSON.parseObject(record.value(), classOf[OrderDetail])
          (orderDetail.order_id, orderDetail)
        })


      })

      // join后不管数据能否关联成功，都要保留二者的全部
      //val ds3: DStream[(String, (OrderDetail, Option[OrderInfo]))] = ds2.leftOuterJoin(ds1)
      val ds3: DStream[(String, (Option[OrderInfo], Option[OrderDetail]))] = ds1.fullOuterJoin(ds2)

      val ds4: DStream[SaleDetail] = ds3.mapPartitions(partition => {

        val gson = new Gson()

        val jedisClient: Jedis = JedisUtil.getJedisClient

        //准备一个存放封装号的SaleDetail的集合
        val saleDetails = new ListBuffer[SaleDetail]

        partition.foreach {
          case (orderId, (orderInfoOption, orderDetailOption)) => {

            if (orderInfoOption != None) {

              val orderInfo: OrderInfo = orderInfoOption.get

              //①和当前批次达到的orderDetail关联
              if (orderDetailOption != None) {

                val orderDetail: OrderDetail = orderDetailOption.get

                saleDetails.append(new SaleDetail(orderInfo, orderDetail))

              }

              //②不能确定后续是否有晚到的orderDetail，需要把orderInfo无条件写入缓存
              jedisClient.set(PrefixConstant.order_info_redis_preffix + orderId, gson.toJson(orderInfo))

              //③不能确定之前批次有没有早到的OrderDetail，需要去缓存查询orderDetail，找到就配对
              // set类型的数据，如果key不存在，返回 Set()
              val earlyCameOrderDetails: util.Set[String] = jedisClient.smembers(PrefixConstant.order_detail_redis_preffix + orderId)

              earlyCameOrderDetails.forEach(jsonStr => {

                val detail: OrderDetail = JSON.parseObject(jsonStr, classOf[OrderDetail])

                saleDetails.append(new SaleDetail(orderInfo, detail))

              })

            } else {

              //处理的是 OrderInfo 为NONE的数据
              val orderDetail: OrderDetail = orderDetailOption.get

              //⑤如果 ①配对不成功，说明当前的orderDetail早到或晚到去缓存查询是否有早到的orderInfo，如果找到，说明当前的orderDetail晚到了，就配对。
              // 如果string类型，key不存在，返回 null，不是 ""
              val orderInfoStr: String = jedisClient.get(PrefixConstant.order_info_redis_preffix + orderId)

              if (orderInfoStr != null) {

                val info: OrderInfo = JSON.parseObject(orderInfoStr, classOf[OrderInfo])

                saleDetails.append(new SaleDetail(info, orderDetail))
              } else {

                //⑥如果找不到，说明orderDetail早到了，就自己写入缓存。等待后续批次的orderInfo匹配。
                jedisClient.sadd(PrefixConstant.order_detail_redis_preffix + orderId, gson.toJson(orderDetail))

              }


            }


          }
        }

        jedisClient.close()

        saleDetails.iterator


      })

      //关联user 和 province
      val ds5: DStream[SaleDetail] = ds4.mapPartitions(partition => {

        val client: Jedis = JedisUtil.getJedisClient

        val saleDetails: Iterator[SaleDetail] = partition.map(saleDetail => {

          val jsonStr: String = client.get(PrefixConstant.user_info_redis_preffix + saleDetail.user_id)
          val userInfo: UserInfo = JSON.parseObject(jsonStr, classOf[UserInfo])
          //关联了用户
          saleDetail.mergeUserInfo(userInfo)

          //关联省份
          saleDetail.mergeProvinceInfo(provinceBC.value.get(saleDetail.province_id).get)

          saleDetail

        })

        client.close()

        saleDetails

      })

      import org.elasticsearch.spark._
      //写入ES
      ds5.foreachRDD(rdd => {

        rdd.cache()

        println("即将写入:"+rdd.count())

       rdd.saveToEs("realtime2022_sale_detail"+LocalDate.now(),Map("es.mapping.id" -> "order_detail_id"))

        //提交偏移量
        orderDS.asInstanceOf[CanCommitOffsets].commitAsync(orderDSOffsets)
        orderDetailDS.asInstanceOf[CanCommitOffsets].commitAsync(orderDetailDSOffsets)

      })
    }
  }

  def queryProvince(conf:SparkConf):Map[String,ProvinceInfo]={

    val provinces = new mutable.HashMap[String, ProvinceInfo]()

    //去mysql查一张表  用SparkSql方便。
    /*
        用JDBC方式写代码

        RDD.foreachPartition(){
            获取连接

            JDBC方式查询

            关闭连接
        }
     */
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val properties = new Properties()

    properties.setProperty("driver",PropertiesUtil.getProperty("jdbc.driver.name"))
    properties.setProperty("user",PropertiesUtil.getProperty("jdbc.user"))
    properties.setProperty("password",PropertiesUtil.getProperty("jdbc.password"))

    // 在Executor端运行
    val df: DataFrame = sparkSession.read.jdbc(PropertiesUtil.getProperty("jdbc.url"), "base_province", properties)

    // DataFrame 转 DataSet
    import sparkSession.implicits._

    val ds: Dataset[ProvinceInfo] = df.as[ProvinceInfo]

    ds.collect().foreach(province => {
      provinces.put(province.id ,province)
    })

    provinces.toMap

  }
}
